IntelliJ IDEA上搭建Flink开发环境 您所在的位置:网站首页 scala 写文件 IntelliJ IDEA上搭建Flink开发环境

IntelliJ IDEA上搭建Flink开发环境

2024-01-05 16:40| 来源: 网络整理| 查看: 265

通过IntelliJ IDEA搭建Flink开发环境,首先要安装Flink和Scala,具体操作请参照:

Flink安装:https://blog.csdn.net/x976269167/article/details/105700963

Scala安装:https://blog.csdn.net/x976269167/article/details/105740307

1、创建一个maven工程

2、填入项目名称、GroupId和ArtifactId,点击Finish后选择New Window新开一个窗口

3、打开Setting,选择Plugins,下载Scala插件;如果搜不到可以通过官网下载,下载完成后,解压到IntelliJ IDEA安装目录的plugins目录下,地址如下 

https://plugins.jetbrains.com/plugin/1347-scala 

 

 

4、下载完成,按照提示重启IntelliJ IDEA,然后打开Project Structure,选择Libraries,点击添加,选择Scala SDK版本

  5、配置Flink环境,继续选择Project Structure,选择Libraries,点击添加,选择Java,选择自己下载安装的Flink,选择lib包 6、在pom.xml文件中添加如下配置 1.8 1.8 1.8 1.9.2 2.12 org.apache.flink flink-scala_${scala.version} ${flink.version} compile org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} compile 7、在src下新建一个scala目录,然后选择Project Structure,将scala目录标记为Sources

8、验证之前,要先装netcat,解压后将文件放到C盘 Windows   System32目录下

 

 

 

9、验证

(1)在scala目录下新建一个scala,代码如下,

import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object WordCountScala { def main(args: Array[String]): Unit = { //生成了配置对象 val config = new Configuration() //打开flink-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件,否则打印日志到控制台,这样你的控制台就清净了 config.setString("web.log.path", "D:\\Java\\Logs\\Flink\\log.file") //配置taskManager的日志文件,否则打印日志到控制台,这样你的控制台就清净了 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "D:\\Java\\Logs\\Flink\\log.file") //获得local运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //定义socket的source源 val text: DataStream[String] = env.socketTextStream( hostname="localhost", port = 6666) //scala开发需要加一行隐式转换,否则在调用operator的时候会报错,作用是找到scala类型的TypeInformation import org.apache.flink.api.scala._ //定义operators,作用是解析数据,分组,并且求wordCount val wordCount: DataStream[(String, Int)] = text.flatMap(_.split(" ")).map((_,1)).keyBy(_._1).sum( position = 1) //定义sink,打印数据到控制台 wordCount.print() //定义任务的名称并运行 //注意:operator是惰性的,只有遇到execute才执行 env.execute(jobName = "SocketWordCount") } }

(2) 打开cmd,输入如下命令, 

nc -l -p 6666

(3) 右键刚才scala下添加的方法,点击运行main方法 

(4)在cmd中随便输入数字,可以看到控制台在计算了,验证成功

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有